/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.service.quartz.impl;

import com.chinamobile.cmss.lakehouse.common.utils.ServiceException;
import com.chinamobile.cmss.lakehouse.service.quartz.QuartzExecutor;

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class QuartzExecutorImpl implements QuartzExecutor {

    private final String taskIdKey = "task_id";
    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    @Autowired
    private Scheduler scheduler;

    @Override
    public void addJob(Class<? extends Job> clazz, String jobName, String jobGroupName, String cronExpression) {
        lock.writeLock().lock();
        try {
            JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
            JobDetail jobDetail;
            // add a task, if already exists, return this task directly
            if (scheduler.checkExists(jobKey)) {
                jobDetail = scheduler.getJobDetail(jobKey);
            } else {
                jobDetail = JobBuilder.newJob(clazz)
                    .withIdentity(jobKey)
                    .withDescription(jobName)
                    .build();
                log.info("add job with name: {}, group name: {}", jobName, jobGroupName);
            }

            jobDetail.getJobDataMap().put(taskIdKey, jobName);

            TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
            CronTrigger cronTrigger = TriggerBuilder.newTrigger()
                .withIdentity(triggerKey)
                .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
                .build();

            if (scheduler.checkExists(triggerKey)) {
                CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
                String oldCronExpression = oldCronTrigger.getCronExpression();

                if (!StringUtils.equalsIgnoreCase(cronExpression, oldCronExpression)) {
                    // reschedule job trigger
                    scheduler.rescheduleJob(triggerKey, cronTrigger);
                    log.info("reschedule job trigger with triggerName: {}, triggerGroupName: {}, cronExpression: {}",
                        jobName, jobGroupName, cronExpression);
                }
            } else {
                scheduler.scheduleJob(jobDetail, cronTrigger);
                log.info("schedule job trigger with triggerName: {}, triggerGroupName: {}, cronExpression: {},",
                    jobName, jobGroupName, cronExpression);
            }

        } catch (Exception e) {
            throw new ServiceException("add job failed", e);
        } finally {
            lock.writeLock().unlock();
        }
    }

    @Override
    public void addJob(Class<? extends Job> clazz, String jobName, String jobGroupName, Trigger trigger, JobDataMap jobDataMap) {
        lock.writeLock().lock();
        try {
            JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
            JobDetail jobDetail;
            // add a task, if already exists, return this task directly
            if (scheduler.checkExists(jobKey)) {
                jobDetail = scheduler.getJobDetail(jobKey);
            } else {
                JobBuilder jobBuilder = JobBuilder.newJob(clazz)
                        .withIdentity(jobKey)
                        .withDescription(jobName);
                if (null != jobDataMap) {
                    jobBuilder.setJobData(jobDataMap);
                }
                jobDetail = jobBuilder.build();
                log.info("add job with name: {}, group name: {}", jobName, jobGroupName);
            }

            jobDetail.getJobDataMap().put(taskIdKey, jobName);
            scheduler.scheduleJob(jobDetail, trigger);
            log.info("schedule job trigger with triggerName: {}, triggerGroupName: {}, cronExpression: {},",
                    jobName, jobGroupName, trigger.toString());
        } catch (Exception e) {
            throw new ServiceException("add job failed", e);
        } finally {
            lock.writeLock().unlock();
        }
    }

    @Override
    public boolean deleteJob(String jobName, String jobGroupName) {
        lock.writeLock().lock();
        try {
            JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
            if (scheduler.checkExists(jobKey)) {
                return scheduler.deleteJob(jobKey);
            }
        } catch (Exception e) {
            throw new ServiceException("add job failed", e);
        } finally {
            lock.writeLock().unlock();
        }
        return false;
    }
}
